package com.gearbox.platform.slurm.service;

import com.gearbox.core.cache.UserWaitingJobCache;
import com.gearbox.core.configuration.MetricConfiguration;
import com.gearbox.core.configuration.SystemConfiguration;
import com.gearbox.core.metric.CustomMetric;
import com.gearbox.core.metric.Metric;
import com.gearbox.core.metric.MetricCalculator;
import com.gearbox.core.model.Job;
import com.gearbox.core.service.JobService;
import com.gearbox.core.service.NodeService;
import com.gearbox.platform.slurm.cache.UserQuota;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Component
@ConditionalOnProperty(name = "system.type", havingValue = "slurm")
public class SlurmMetricCalculator implements MetricCalculator {
    private static final Logger LOG = LoggerFactory.getLogger(SlurmMetricCalculator.class);

    @Autowired
    private NodeService nodeService;

    @Autowired
    private JobService jobService;

    @Autowired
    private MetricConfiguration metricConfiguration;

    @Autowired
    private SystemConfiguration systemConfiguration;

    @Autowired
    private UserWaitingJobCache waitingJobCache;

    @Override
    public Metric calculateMetric() {
        List<Job> waitingJobs = jobService.listWaitingJobs();
        UserQuota userQuota = buildUserQuota(splitJobsByUser(waitingJobs));
        increaseWaitingTimeOfValidJob(waitingJobs, userQuota);
        int workLoad = calculateWorkLoad(userQuota);
        return new CustomMetric(metricConfiguration.getName(), BigDecimal.valueOf(workLoad));
    }

    int calculateWorkLoad(UserQuota userQuota) {
        LOG.info("calculate workload begin");
        waitingJobCache.printAllJobs();
        LOG.info("user left cpu quotas={}", userQuota);
        int insufficientCpu = 0;
        for (String user : waitingJobCache.getUsers()) {
            List<Job> jobs = waitingJobCache.getJobsByUser(user);
            insufficientCpu += Math.min(Math.max(userQuota.getQuotasByUser(user), 0),
                jobs.stream().mapToInt(Job::getCpus).sum());
        }
        return (int) Math.ceil((double) insufficientCpu / systemConfiguration.getCpu());
    }

    private UserQuota buildUserQuota(Map<String, List<Job>> userWithWaitingJobs) {
        Map<String, List<Job>> userWithRunningJobs = splitJobsByUser(jobService.listRunningJobs());
        UserQuota userQuota = new UserQuota();
        userWithWaitingJobs.forEach((user, jobs) -> {
            userQuota.set(user, calculateQuotasOfUser(user, userWithRunningJobs));
        });
        return userQuota;
    }

    private int calculateQuotasOfUser(String user, Map<String, List<Job>> userWithRunningJobs) {
        List<Job> runningJobs = userWithRunningJobs.getOrDefault(user, Collections.emptyList());
        int usedCpu = runningJobs.stream().mapToInt(Job::getCpus).sum();
        int totalCpuQuotas = getUserCpuQuotas(user);
        int leftCpuQuotas = Math.max(totalCpuQuotas - usedCpu, 0);
        LOG.info("usedCpu,totalCpu,leftCpu of user:[{}] = {}, {}, {}", user, usedCpu, totalCpuQuotas, leftCpuQuotas);
        return leftCpuQuotas;
    }

    private void increaseWaitingTimeOfValidJob(List<Job> waitingJobs, UserQuota userQuota) {
        List<Job> jobs = waitingJobs.stream()
            .filter(
                job -> userQuota.getQuotasByUser(job.getUser()) != null && userQuota.getQuotasByUser(job.getUser()) > 0)
            .collect(Collectors.toList());
        waitingJobCache.updateCache(jobs);
    }

    private Map<String, List<Job>> splitJobsByUser(List<Job> jobs) {
        Map<String, List<Job>> userWithJobs = new HashMap<>();
        jobs.forEach(job -> {
            userWithJobs.putIfAbsent(job.getUser(), new ArrayList<>());
            userWithJobs.get(job.getUser()).add(job);
        });
        return userWithJobs;
    }

    private int getUserCpuQuotas(String user) {
        try {
            return nodeService.getUserNodeQuotas(user) * systemConfiguration.getCpu();
        } catch (Exception e) {
            LOG.error("get user:[{}] cpu quotas failed", user, e);
            return 0;
        }
    }
}
